<?php
declare (strict_types = 1);

namespace app\command;

use app\api\services\PayService;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\Output;

class RabbitMQOrderCallback extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('rabbitmq:order:callback')
            ->setDescription('the Order:callback command');
    }

    /**
     * @throws \ErrorException
     */
    protected function execute(Input $input, Output $output)
    {
        $config     = config('rabbitmq.hosts');
        $connection = new AMQPStreamConnection(
            $config['host'],
            $config['port'],
            $config['username'],
            $config['password'],
            $config['vhost']
        );
        $channel = $connection->channel();

        $channel->basic_qos(0, 1, false);
        $conf = config('rabbitmq.order_callback_queue');
        /**
         * 创建交换机(Exchange)
         * name: vckai_exchange// 交换机名称
         * type: direct        // 交换机类型，分别为direct/fanout/topic，参考另外文章的Exchange Type说明。
         * passive: false      // 如果设置true存在则返回OK，否则就报错。设置false存在返回OK，不存在则自动创建
         * durable: false      // 是否持久化，设置false是存放到内存中的，RabbitMQ重启后会丢失
         * auto_delete: false  // 是否自动删除，当最后一个消费者断开连接之后队列是否自动被删除
         */
        $channel->exchange_declare(
            $conf['exchange_name'],
            'direct',
            false,
            true,
            false
        );
        // var_dump($conf);exit;
        $channel->queue_declare($conf['queue_name'],
            false,
            true,
            false,
            false,
            false,
            []
        );
        //将队列名与交换器名进行绑定，并指定routing_key
        $channel->queue_bind(
            $conf['queue_name'],
            $conf['exchange_name'],
            $conf['route_key']
        );

        $callback = function ($msg) {
            echo " [x] Received " . date('Y-m-d H:i:s'), $msg->body, "\n";
            // 处理接收到的消息
            $data = $msg->body ? json_decode($msg->body, true) : [];
            PayService::getInstance()->payFormOrder($data);
            //手动确认ack，确保消息已经处理
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };

        $channel->basic_consume($conf['queue_name']
            , '', false, false, false, false, $callback);

        while (count($channel->callbacks)) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
}
